Nacos Server原理

服务注册:Nacos Client通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身元数据,如ip地址、端口等信息。Nacos Server接收到注册请求后,把这些元数据信息存储在一个双层的内存Map中

服务心跳:在服务注册后,Nacos Client会维护一个定时心跳来持续通知Nacos Server,说明服务一直处于可用状态,防止被剔除。默认5s发送一次心跳

服务健康检查:Nacos Server会开启一个定时任务用来检查注册服务实例的健康情况,对于超过15s没有收到客户端心跳的实例会将它的healthy属性置为false,若某个实例超过30s没收到心跳,直接剔除该实例,被剔除实例若恢复发送心跳则重新注册

服务发现:Nacos Client调用服务提供者服务时,会发送一个REST请求给Nacos Server,获取注册服务清单,且缓存在Nacos Client本地,同时会在Nacos Client本地开启一个定时任务定时拉取服务端最新注册表信息更新到本地缓存

服务同步:Nacos Server集群之间会互相同步服务实例,用来保证服务信息的一致性。

Nacos服务注册表结构实例

服务注册

客户端服务注册调用的InstanceControllerregister最终调用ServiceManagerregisterInstance注册服务实例。首先判断当前命名空间下该服务名称的服务是否存在若不存在这创建一个服务,将其放入服务注册表serviceMap中,添加心跳监控检查,以及创建RecordListener,将当前实例添加到服务中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request); // 解析请求参数为Instance
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
public class ServiceManager implements RecordListener<Service> {
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // 若对应的Service不存在,则先创建
Service service = getService(namespaceId, serviceName); // 从缓存中获取Service
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); // 将实例对象添加到注册表,以及同步给其它服务端成员
}
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) { // 若缓存中namespaceId下无serviceName对应的Service
service = new Service();
service.setName(serviceName); // 设置serviceName
service.setNamespaceId(namespaceId); // 设置namespaceId
service.setGroupName(NamingUtils.getGroupName(serviceName));
service.setLastModifiedMillis(System.currentTimeMillis()); // 设置服务最新更新时间为当前时间
service.recalculateChecksum();
if (cluster != null) { // cluster传入的是null,故这里Service中clusterMap为空
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
putServiceAndInit(service); // 将Service添加到注册表中,添加心跳监控检查,以及创建RecordListener
if (!local) { // AP模式local为true,CP模式local为false
addOrReplaceService(service); // CP模式添加或替换服务
}
}
}
public Service getService(String namespaceId, String serviceName) {
if (serviceMap.get(namespaceId) == null) {
return null; // 若namespaceId对应的Service Map都不存在
}
return chooseServiceMap(namespaceId).get(serviceName); // 从缓存serviceMap中获取serviceName的Service
}
}

首先通过putService方法将当前Service添加到注册表中,然后通过Service的init方法为当前服务注册AP模式的健康检查心跳任务ClientBeatCheckTask,该心跳任务5s后执行,且任务执行完后每5s执行。以及CP模式的健康检查心跳任务HealthCheckTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void putServiceAndInit(Service service) throws NacosException {
putService(service); // 将服务添加到注册表
service.init(); // 心跳检查注册,创建时不会创建CP模式心跳监控检查任务
// 给AP模式的服务注册监听器,以便服务变更时将服务同步给其他服务端成员
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
// 给CP模式的服务注册监听器,以便服务变更时将服务同步给其他服务端成员
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
}
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) { // 若namespaceId对应的Map不存在,则先创建一个ConcurrentSkipListMap
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
serviceMap.get(service.getNamespaceId()).put(service.getName(), service); // 将服务添加到注册表
}
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
public void init() {
HealthCheckReactor.scheduleCheck(clientBeatCheckTask); // CP模式心跳检测
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init(); // 创建CP模式心跳监控检查任务
}
}
}
public class HealthCheckReactor {
public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
}
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
public void init() {
if (inited) {
return;
}
checkTask = new HealthCheckTask(this); // 创建CP模式心跳监控检查任务
HealthCheckReactor.scheduleCheck(checkTask);
inited = true;
}
}

首先获取该Service下所有实例对象,然后遍历通过当前时间减去客户端最后发送心跳时间若大于15s则将当前实例健康状态设置为false。若当前时间减去客户端最后发送心跳时间大于30s,则异步调用自身的HTTP接口/v1/ns/instance将其踢出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class ClientBeatCheckTask implements Runnable {
public void run() { // 客户端心跳检测
try {
if (!getDistroMapper().responsible(service.getName())) {
return;// 当前客户端服务不是注册在当前的服务端上
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return; // 若不允许健康检查则跳过
}
List<Instance> instances = service.allIPs(true); // 获取所有ephemeralInstances实例,即AP模式的实例
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) { // 判断最后心跳时间是否大于15s,marked默认为false
if (instance.isHealthy()) {
instance.setHealthy(false); // 若当前节点状态是健康的则置为非健康状态
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); // 发布实例健康检查超时事件
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {// expireInstance默认为true
return;
}
for (Instance instance : instances) {
if (instance.isMarked()) { // marked默认为false
continue;
}
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
deleteIp(instance); // 判断最后心跳时间是否大于30s,若是则移除该实例
}
}
} catch (Exception e) {
}
}
}
public List<Instance> allIPs(boolean ephemeral) {
List<Instance> result = new ArrayList<>();
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
result.addAll(entry.getValue().allIPs(ephemeral));
}
return result;
}
public List<Instance> allIPs(boolean ephemeral) {
return ephemeral ? new ArrayList<>(ephemeralInstances) : new ArrayList<>(persistentInstances);
}
private void deleteIp(Instance instance) {
try {// 通过异步调用/v1/ns/instance接口的deregister来删除实例
NamingProxy.Request request = NamingProxy.Request.newRequest();
request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort()))
.appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName())
.appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());
String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();
HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {}
@Override
public void onError(Throwable throwable) {}
@Override
public void onCancel() {}
});
} catch (Exception e) {}
}
public class DistroMapper extends MemberChangeListener {
public boolean responsible(String serviceName) { // 当前客户端服务是否注册在当前的服务端上,在返回true
final List<String> servers = healthyList; // 健康的服务端成员列表
if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
return true; // 若是standalone启动模式
}
if (CollectionUtils.isEmpty(servers)) { // 健康的服务端成员列表为空
return false;
}
int index = servers.indexOf(EnvUtil.getLocalAddress());
int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
if (lastIndex < 0 || index < 0) {
return true; // 若本机不在健康的服务端成员列表中
}
int target = distroHash(serviceName) % servers.size();
return target >= index && target <= lastIndex;
}
}

添加实例时将DataStore缓存中数据获取出来,与注册表中的数据进行对比,并更新健康状态和最后心跳时间,将新增实例添加到Map中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); // 获取实例最新实例列表
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances); // 将实例对象添加到注册表,以及同步给其它服务端成员
}
}
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {
// 首先从缓存中获取客户端服务缓存
Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
List<Instance> currentIPs = service.allIPs(ephemeral); // 从注册表中获取该服务下的实例列表
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
Set<String> currentInstanceIds = Sets.newHashSet();
for (Instance instance : currentIPs) {
currentInstances.put(instance.toIpAddr(), instance);
currentInstanceIds.add(instance.getInstanceId());
}
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) { // 若存在缓存数据,则用注册表中的实例数据更新缓存中实例的健康状态和最后心跳时间
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
instanceMap = new HashMap<>(ips.length); // 不存在缓存数据,创建一个空的Map
}
for (Instance instance : ips) { // 遍历传入的实例列表,创建是一个
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init(); // 若实例集群不存在,则创建集群,且创建CP模式心跳监控检查任务
service.getClusterMap().put(instance.getClusterName(), cluster);
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey()); // 若是删除实例,则直接从移除
} else { // 若是添加实例
Instance oldInstance = instanceMap.get(instance.getDatumKey());
if (oldInstance != null) { // 若存在旧实例,则将新增实例的instanceId替换为旧实例的instanceId
instance.setInstanceId(oldInstance.getInstanceId());
} else { // 若不存在旧实例则构建一个instanceId,默认ip#port#clusterName#serviceName
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
instanceMap.put(instance.getDatumKey(), instance); // 将新增实例放入instanceMap
}
}
// 若是添加实例,且添加实例失败抛出异常
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));
}
return new ArrayList<>(instanceMap.values());
}

获取到最新的实例列表后通过调用DistroConsistencyServiceImplput方法将实例列表放入缓存,然后将其添加到Notifier中的阻塞队列同步实例数据到其它服务端成员列表中,是将当前服务名称下所有实例同步到其他服务端,最终是通过异步任务加队列的方式,调用HTTP接口最终在其他服务上也是通过onPut方法来完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
public void put(String key, Record value) throws NacosException {
onPut(key, value); // 若是ephemeral实例将其添加到DataStore缓存中,然后通过异步任务替换注册表中实例列表
// 同步实例数据到其它服务端成员列表中,是将当前服务名称下所有实例同步到其他服务端,最终是通过异步任务加队列的方式,调用HTTP接口最终在其他服务上也是通过onPut方法来完成
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) { // 若是ephemeral实例记录
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum); // 更新缓存数据
}
if (!listeners.containsKey(key)) {
return; // 若对应的RecordListener不存在则直接跳过,一般在创建实例时会添加该监听器
}
notifier.addTask(key, DataOperation.CHANGE);// 将当前任务放入阻塞队列
}
}
public class Notifier implements Runnable {
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return; // 若当前Task已存在则直接忽略
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY); // 将当前任务添加到services列表中
}
tasks.offer(Pair.with(datumKey, action)); // 将当前任务放入阻塞队列
}
public void run() {
for (; ; ) {
try {
Pair<String, DataOperation> pair = tasks.take(); // 阻塞消费队列
handle(pair);
} catch (Throwable e) {
}
}
}
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey); // 从任务列表移除
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
// 这里的获取到的RecordListener是在创建实例对象时在putServiceAndInit中通过ConsistencyService#listen方法添加的
for (RecordListener listener : listeners.get(datumKey)) { // listener就是一个Service对象
count++;
try {
if (action == DataOperation.CHANGE) { // 将最新的实例列表数据通过Service的onChange方法进行注册表更新
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) { // 通过Service的onDelete方法进行删除
listener.onDelete(datumKey); // 在Service中是空实现
continue;
}
} catch (Throwable e) {
}
}
} catch (Throwable e) {
}
}
}

最终调用ServiceonChange方法来,最终替换注册表中Cluster的实例列表。这里更新注册表内存方法中,为了防止读写并发冲突,大量运用了CopyOnWrite思想防止并发冲突。这里也会使用PushService将服务变动通过UDP的方式通知给订阅的客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
public void onChange(String key, Instances value) throws Exception {
for (Instance instance : value.getInstanceList()) { // 遍历实例列表
if (instance == null) { // 若实例为null抛出异常
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D); // 权重最大值为10000
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D); // 权重最小值为0.01
}
}
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); // 更新注册表中实例列表
recalculateChecksum(); // 根据最新实例列表重新计算checksum
}
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) { // 构造一个空的Map来存放最新的实例列表
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) { // 遍历最新的实例列表
try {
if (instance == null) { // 跳过为null的实例
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) { // 若实例集群名称为null则设置为DEFAULT集群
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
if (!clusterMap.containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init(); // 若当前实例集群在注册表中不存在,则创建一个服务集群并初始化
getClusterMap().put(instance.getClusterName(), cluster);
}
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
clusterIPs.add(instance); // 将实例分集群放入ipMap中
} catch (Exception e) {
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> entryIPs = entry.getValue(); //make every ip mine
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); // 真正更新注册表中实例列表
}
setLastModifiedMillis(System.currentTimeMillis()); // 设置服务最后更新时间为当前时间
getPushService().serviceChanged(this); // 发布服务变更事件
}
}

当服务实例发生变更会发送ServiceChangeEvent监听事件,该事件会给客户端发送UDP请求通知服务信息变更。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
public void serviceChanged(Service service) {
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
return;
}
this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
Future future = GlobalExecutor.scheduleUdpSender(() -> { // 每1s执行一次
try {// // 客户端拉取某个服务时,会将客户端记录下来
ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
if (client.zombie()) {
clients.remove(client.toString());
continue;
}
Receiver.AckEntry ackEntry;
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
}
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
udpPush(ackEntry); // 发送UDP通知客户端
}
} catch (Exception e) {
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
return null;
}
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
ackMap.put(ackEntry.key, ackEntry);
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
}

更新注册表中实例列表是通过ClusterupdateIps方法,使用CopyOnWrite思想来完成注册表的更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
public void updateIps(List<Instance> ips, boolean ephemeral) {
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) { // 注册表中旧实例列表
oldIpMap.put(ip.getDatumKey(), ip);
}
// updatedIPs为即在旧实例列表中也在新实例列表中的实例替换为新实例后的数据
List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
if (updatedIPs.size() > 0) { // 主要逻辑是打印日志和简健康状态替换为旧的健康状态
for (Instance ip : updatedIPs) {
Instance oldIP = oldIpMap.get(ip.getDatumKey()); // 获取到旧的实例数据
if (!ip.isMarked()) { // marked默认为false
ip.setHealthy(oldIP.isHealthy()); // 将新实例对象的健康状态设置为旧实例的健康状态
}
}
}
List<Instance> newIPs = subtract(ips, oldIpMap.values()); // 返回新增的实例列表
if (newIPs.size() > 0) {
for (Instance ip : newIPs) {
HealthCheckStatus.reset(ip);
}
}
List<Instance> deadIPs = subtract(oldIpMap.values(), ips); // 返回需删除的实例列表
if (deadIPs.size() > 0) {
for (Instance ip : deadIPs) {
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips); // 通过CopyOnWrite将新的实例列表覆盖注册表中的实例列表
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
private List<Instance> updatedIps(Collection<Instance> newInstance, Collection<Instance> oldInstance) {
// 旧实例列表与新实例列表的交集,即未发送变化的实例列表
List<Instance> intersects = (List<Instance>) CollectionUtils.intersection(newInstance, oldInstance);
Map<String, Instance> stringIpAddressMap = new ConcurrentHashMap<>(intersects.size());
for (Instance instance : intersects) { // 将未变化的实例列表放入stringIpAddressMap中
stringIpAddressMap.put(instance.getIp() + ":" + instance.getPort(), instance);
}
// 新实例和就实例总和
Map<String, Integer> intersectMap = new ConcurrentHashMap<>(newInstance.size() + oldInstance.size());
Map<String, Instance> updatedInstancesMap = new ConcurrentHashMap<>(newInstance.size());
Map<String, Instance> newInstancesMap = new ConcurrentHashMap<>(newInstance.size());
for (Instance instance : oldInstance) {
if (stringIpAddressMap.containsKey(instance.getIp() + ":" + instance.getPort())) {
intersectMap.put(instance.toString(), 1); // 将旧实例放入intersectMap中,且值设置为1
}
}
for (Instance instance : newInstance) {
if (stringIpAddressMap.containsKey(instance.getIp() + ":" + instance.getPort())) {
if (intersectMap.containsKey(instance.toString())) {
intersectMap.put(instance.toString(), 2); // 将新实例放入intersectMap中,且值设置为2
} else {
intersectMap.put(instance.toString(), 1); // 将即在旧实例列表中也在新实例列表中放入intersectMap中,且值设置为1
}
}
newInstancesMap.put(instance.toString(), instance); // 新实例方法newInstancesMap
}
for (Map.Entry<String, Integer> entry : intersectMap.entrySet()) {
String key = entry.getKey();
Integer value = entry.getValue();
if (value == 1) { // 若是旧实例
if (newInstancesMap.containsKey(key)) { // 即在旧实例列表中也在新实例列表中
updatedInstancesMap.put(key, newInstancesMap.get(key)); // 将即在旧实例列表中也在新实例列表中的实例替换为新实例
}
}
}
return new ArrayList<>(updatedInstancesMap.values()); // 返回即在旧实例列表中也在新实例列表中
}
private List<Instance> subtract(Collection<Instance> oldIp, Collection<Instance> ips) {
Map<String, Instance> ipsMap = new HashMap<>(ips.size());
for (Instance instance : ips) {
ipsMap.put(instance.getIp() + ":" + instance.getPort(), instance);
}
List<Instance> instanceResult = new ArrayList<>();
for (Instance instance : oldIp) {
if (!ipsMap.containsKey(instance.getIp() + ":" + instance.getPort())) {
instanceResult.add(instance); // 将新的实例列表中不包含的实例添加到instanceResult
}
}
return instanceResult; // 获取需要删除的实例列表,即新的实例列表中不包含旧的实例列表中存在的实例
}
}

删除下线实例

删除已下线实例是通过对客户端做心跳健康检查时,判断心跳时间,若超过30s表示服务已下线,则通过异步调用/v1/ns/instance接口来完成已下线实例的删除。最终调用InstanceController接口的deregister方法。最终通过substractIpAddresses方法将已下线服务实例从列表中删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@CanDistro
@DeleteMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String deregister(HttpServletRequest request) throws Exception {
Instance instance = getIpAddress(request);
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
return "ok";
}
serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
return "ok";
}
public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
Service service = getService(namespaceId, serviceName); // 从注册表中获取服务
synchronized (service) {
removeInstance(namespaceId, serviceName, ephemeral, service, ips);
}
}
private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips); // 返回最新的实例列表
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances);
}

最终调用updateIpAddresses方法,该方法既可以完成实例的添加也可以完成实例的删除。首先从缓存中获取该服务下的实例列表,然后从注册表中获取该服务下的实例列表,若存在缓存数据,则用注册表中的实例数据更新缓存中实例的健康状态和最后心跳时间,遍历传入的实例列表,判断若是删除实例,则直接移除。最终返回最新的服务实例列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ServiceManager implements RecordListener<Service> {
private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);
}
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {
// 首先从缓存中获取客户端服务缓存
Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
List<Instance> currentIPs = service.allIPs(ephemeral); // 从注册表中获取该服务下的实例列表
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
Set<String> currentInstanceIds = Sets.newHashSet();
for (Instance instance : currentIPs) {
currentInstances.put(instance.toIpAddr(), instance);
currentInstanceIds.add(instance.getInstanceId());
}
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) { // 若存在缓存数据,则用注册表中的实例数据更新缓存中实例的健康状态和最后心跳时间
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
instanceMap = new HashMap<>(ips.length); // 不存在缓存数据,创建一个空的Map
}
for (Instance instance : ips) {
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init(); // 若实例集群不存在,则创建集群
service.getClusterMap().put(instance.getClusterName(), cluster);
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey()); // 若是删除实例,则直接从移除
} else { // 若是添加实例
Instance oldInstance = instanceMap.get(instance.getDatumKey());
if (oldInstance != null) { // 若存在旧实例,则将新增实例的instanceId替换为旧实例的instanceId
instance.setInstanceId(oldInstance.getInstanceId());
} else { // 若不存在旧实例则构建一个instanceId
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
instanceMap.put(instance.getDatumKey(), instance); // 将新增实例放入instanceMap
}
}
// 若是添加实例,且添加实例失败抛出异常
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));
}
return new ArrayList<>(instanceMap.values());
}
}

最后通过DistroConsistencyServiceImplput方法将实例,将其添加到DataStore缓存中,然后通过异步任务替换注册表中实例列表,以及同步实例数据到其它服务端成员列表中。